Skip to content

[iris] LogPusher: background drain thread, self-heal via resolver, never drop on send failures#4866

Merged
rjpower merged 11 commits intomainfrom
iris-logpusher-resolver
Apr 17, 2026
Merged

[iris] LogPusher: background drain thread, self-heal via resolver, never drop on send failures#4866
rjpower merged 11 commits intomainfrom
iris-logpusher-resolver

Conversation

@rjpower
Copy link
Copy Markdown
Collaborator

@rjpower rjpower commented Apr 17, 2026

Rewrite LogPusher with a dedicated drain thread. push() is non-blocking;
the thread wakes on batch_size, flush, close, or flush_interval and sends
what's buffered. Every send failure rebuffers at the head of the key's
deque and backs off — retryable errors additionally invalidate the cached
RPC client so the next attempt re-resolves. Only MAX_LOG_BUFFER_SIZE (10k)
overflow drops entries, oldest-first. Worker builds one pusher in start()
with a resolver that looks up /system/log-server on the controller.

Fixes #4860

Give LogPusher an optional str->str resolver (e.g. resolves
iris://system/log-server via the controller's list_endpoints RPC). Cached
RPC client is invalidated on retryable push errors; the next push
re-resolves. Worker creates one pusher that survives across lifecycle
resets instead of rebuilding on every heartbeat-timeout iteration, which
is what let the silent log-mute bug slip through when heartbeats
recovered after a controller restart.

Fixes #4860
@rjpower rjpower added the agent-generated Created by automation/agent label Apr 17, 2026
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 17, 2026

Claude finished @rjpower's task in 5m 24s —— View job


Code review

No issues found. Checked for bugs and AGENTS.md compliance.

  • Gather context and review PR diff
  • Launch parallel review agents (bugs + AGENTS.md)
  • Validate flagged issues
  • Post review comment

Notes from the review:

  • Resolver is invoked under _send_lock, so resolve/invalidate/close races are handled.
  • _controller_client is always set before the lifecycle thread starts, so the assert in _resolve_log_service cannot trip on startup.
  • is_retryable_error matches existing repo semantics; non-retryable errors correctly skip invalidation.
  • close() cancels the flush timer, flushes, then acquires _send_lock and nils _client — the new None guard handles the deferred-construction case.
  • Idempotent _attach_log_handler preserves handler/pusher identity and only renames handler.key on re-attach.
  • New tests cover lazy resolve, retryable-error invalidation, non-retryable pass-through, resolver-raising retry, and static-URL eager behavior.
    iris-logpusher-resolver

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 877c3c957d

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +501 to +504
self._log_pusher = LogPusher(
"iris://system/log-server",
interceptors=log_interceptors,
resolver=self._resolve_log_service,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep endpoint resolution off synchronous emit path

Passing resolver=self._resolve_log_service here moves a controller RPC into LogPusher._send, which is invoked inline from push() when a buffer reaches batch_size. Because the worker controller client is created with a 60s default timeout, a controller outage can block the caller thread that emitted the triggering log line for up to a minute. Before this change, endpoint lookup happened in lifecycle attach (not on the log emit path), so noisy task logs could not stall worker/task execution this way.

Useful? React with 👍 / 👎.

Comment thread lib/iris/src/iris/log_server/client.py Outdated
rjpower added 2 commits April 17, 2026 09:16
…attach/rename

Move LogPusher construction out of _attach_log_handler and into start()
alongside the controller client. Attach logic becomes one short branch on
(handler is None). Resolver honors the server_url it's given instead of
ignoring it.
Rewrite LogPusher with a dedicated drain thread. push() is non-blocking;
the drain thread wakes on batch_size, flush(), close(), or flush_interval
and sends what's buffered. All send failures re-buffer at the head of
the key's deque — retryable and non-retryable alike. MAX_LOG_BUFFER_SIZE
(10k) is the only path that drops entries, via oldest-first eviction.
@rjpower rjpower changed the title [iris] LogPusher: self-heal on log-server failover via resolver callback [iris] LogPusher: background drain thread, self-heal via resolver, never drop on send failures Apr 17, 2026
@rjpower
Copy link
Copy Markdown
Collaborator Author

rjpower commented Apr 17, 2026

🤖 Specification (required for PRs over ~500 LOC).

Problem

Worker at `lib/iris/src/iris/cluster/worker/worker.py` resolved the log service
only inside `_attach_log_handler`, which runs at the top of each lifecycle
iteration (after `_reset_worker_state`). Once the worker passed `_register`
successfully it stayed in `_serve` indefinitely and would only re-resolve on
the next heartbeat timeout. If a controller restart broke the cached
log-server address but not the heartbeat stream, the worker kept accepting
tasks while its log push channel was dead.

Observed in #4860: worker
`marin-tpu-v5e-serving-4-us-west4-a-20260416-2029-2077ebb7-worker-0` lost
heartbeat at 2026-04-16 22:18:37Z, ran its reset routine, failed to re-resolve
`/system/log-server` twice, then stayed silent for >16 hours of subsequent
job execution. 11 of 128 task ordinals on that worker — including the parent
driver task — produced zero log rows.

Second problem: the old LogPusher did sync RPC sends inline from `push()`
whenever `batch_size` was reached, and silently dropped entries on any
exception. That's fine for never-logging but bad for a single failure window
where we'd rather backlog and retry.

Approach

Single `LogPusher` class, no new public types. Changes:

  • Construction takes an optional `resolver: Callable[[str], str]`. When set,
    `server_url` is an opaque name passed to the resolver; the cached RPC
    client is built lazily and rebuilt on retryable failures.
  • A dedicated daemon thread owns sending. `push()` appends to a per-key
    `deque` under a `Condition` and returns without blocking.
  • Drain loop wakes on: batch-size notify, `flush()` notify, `close()` notify,
    or `flush_interval` timeout. Sends everything buffered in one pass.
  • Every send failure rebuffers at the head of the deque with `1s` backoff.
    Retryable errors (UNAVAILABLE / INTERNAL / DEADLINE_EXCEEDED) additionally
    invalidate the cached RPC client so the next attempt re-resolves.
  • Only `MAX_LOG_BUFFER_SIZE=10_000` total-entry cap can drop — oldest-first
    across keys, logged as a WARN.
  • Worker builds one LogPusher in `start()` with a resolver that calls
    `controller_client.list_endpoints(prefix=server_url, exact=True)`.
    `_attach_log_handler` becomes attach-or-rename-key, idempotent.

Key code

Drain loop:

```python
def _run(self):
while True:
with self._cond:
while not self._closed and self._buffered == 0:
self._cond.wait(timeout=self._flush_interval)
if self._closed and self._buffered == 0:
return
batch = self._take_batch_locked()
failed = self._send_batch(batch)
if not failed:
continue
with self._cond:
self._rebuffer_at_head_locked(failed)
if self._closed:
return
self._cond.wait(timeout=_SEND_FAILURE_BACKOFF_SEC)
```

Send — every failure rebuffers, only retryable failures invalidate:

```python
def _send_batch(self, batch):
for i, (key, entries) in enumerate(batch):
try:
client = self._get_client()
except Exception as exc:
logger.warning("LogPusher: endpoint resolution failed: %s", exc)
return list(batch[i:])
try:
client.push_logs(PushLogsRequest(key=key, entries=entries))
except Exception as exc:
if is_retryable_error(exc):
self._invalidate(str(exc))
return list(batch[i:])
return []
```

Worker attach:

```python
def _attach_log_handler(self):
if not self._worker_id or self._log_pusher is None:
return
key = worker_log_key(self._worker_id)
if self._log_handler is None:
self._log_handler = RemoteLogHandler(self._log_pusher, key=key)
self._log_handler.setLevel(logging.INFO)
self._log_handler.setFormatter(logging.Formatter("%(asctime)s %(name)s %(message)s"))
logging.getLogger().addHandler(self._log_handler)
else:
self._log_handler.key = key
```

Tests

`lib/iris/tests/test_remote_log_handler.py` covers:

  • push is non-blocking; drain thread ships batches below and at batch_size
  • flush drains immediately even with a long flush_interval
  • close drains once then exits without hanging
  • MAX_LOG_BUFFER_SIZE overflow drops oldest across keys with a WARN
  • retryable failure invalidates the cached client, re-resolves, delivers
    the rebuffered batch on the next successful send
  • non-retryable failure rebuffers (does NOT drop) and does not invalidate
  • resolver raising is rebuffered and retried on the next tick
  • static-URL pusher retries on the same client without invalidating

`lib/iris/tests/cluster/worker/test_worker.py` covers: attach-once-key-rename
semantics, no-op before worker_id is known.

84/85 of the relevant tests pass locally (1 skip unrelated). Pre-commit
clean. Specific caveat: under a persistent non-retryable send error
(malformed payload), the drain loop backs off at 1s until it trips
MAX_LOG_BUFFER_SIZE. Acceptable per the never-drop directive.

rjpower added 4 commits April 17, 2026 09:32
Replace the fixed 1s wait between failed send attempts with rigging's
ExponentialBackoff (0.5s → 30s, factor 2). A push()-triggered notify
would otherwise cut the wait short and turn a steady stream of failures
into a hot loop. Backoff resets on any successful batch send.
Collapse per-key deques into one FIFO of (key, entry) pairs. Trimming on
overflow is a single popleft. Rebuffering on send failure is appendleft
of the untried tail in original order. The drain thread groups by key
just before sending — one push_logs RPC per key, same as before.
Drop the resolver-None branches. When no resolver is provided, default to
an identity function; _get_client and _invalidate work the same either
way. Static-URL pushers now heal stuck TCP connections by rebuilding the
RPC client against the same address on retryable failures.
@rjpower rjpower requested a review from yonromai April 17, 2026 16:44
rjpower added 3 commits April 17, 2026 09:45
Set propagate=False on the module logger and install a direct stderr
handler. Otherwise our own "send failed" warnings would reach the
RemoteLogHandler on the root logger and feed right back into the pusher
we serve — a re-entrant amplification loop during failure storms.

Also remove the duplicate close check after the backoff wait. The outer
retry loop's rebuffer/close branch already handles it; one extra send
attempt on close is fine (close() joins with a generous timeout).
Compress the class docstring to a 4-sentence sketch — the implementation
detail in the drain-thread bullets was out of place for an API doc.
Fold test_retryable_failure_rebuffers_at_head, test_retryable_error_
invalidates_and_reresolves, test_non_retryable_error_rebuffers_without_
invalidating, test_resolver_raising_is_retried, and test_static_url_
pusher_reconnects_on_retryable_failure into a single parameterized
test_failures_always_deliver_via_retry with four scenarios. The
delivery guarantee is the real claim; per-scenario side effects
(invalidate/no-invalidate, rebuild/same-client) stay as branch-specific
assertions.
Replace ``while True`` + mid-loop close returns with
``while not self._closed``. The close check after rebuffer is gone; the
backoff wait still breaks on close via ``_closed``, and the outer loop
then exits naturally at the next iteration. Semantics unchanged — one
best-effort drain pass on close, then exit.
Copy link
Copy Markdown
Contributor

@yonromai yonromai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved with 2 follow-ups called out inline.

  • LogPusher.close() can still close the client before an in-flight send returns.
  • LogPusher.flush() is now non-blocking, which changes the existing task-cleanup contract around final log delivery.

Validation: uv run --group dev python -m pytest tests/test_remote_log_handler.py tests/cluster/worker/test_worker.py -q (51 passed; 1 existing thread-leak warning in test_stop_preserve_containers_does_not_kill_tasks).

Generated with Codex.

Comment thread lib/iris/src/iris/log_server/client.py
Comment thread lib/iris/src/iris/log_server/client.py Outdated
Non-blocking. For draining on shutdown, use ``close``.
"""
with self._cond:
if self._queue:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: flush() is now explicitly non-blocking, but an existing call site still relies on the old synchronous behavior during task cleanup: TaskAttempt._cleanup() uses self._log_pusher.flush() so buffered logs reach the server before cleanup continues (task_attempt.py#L935-L942:

# Flush buffered log entries so they reach the server before the task
# is reported as complete. The pusher is shared across tasks so we
# flush rather than close.
if self._log_pusher is not None:
try:
self._log_pusher.flush()
except Exception as e:
logger.debug("Failed to flush logs for task %s: %s", self.task_id, e)
). With this implementation, flush() only notifies the drain thread and returns, so short-lived tasks can finish cleanup with their final logs still queued. Could we preserve blocking flush() semantics, or add a synchronous drain method for the task-cleanup path?

Generated with Codex.

@rjpower
Copy link
Copy Markdown
Collaborator Author

rjpower commented Apr 17, 2026

@claude address the PR comments and failing test:

=========================== short test summary info ============================
FAILED tests/test_remote_log_handler.py::test_failures_always_deliver_via_retry[resolver_raises] - assert 2 == 1

  • where 2 = len([<tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6b9cf82490>, <tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6b9cf82590>])
    FAILED tests/test_remote_log_handler.py::test_failures_always_deliver_via_retry[non_retryable] - assert 6 == 1
  • where 6 = len([<tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6bc46e1ad0>, <tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6bc47cecd0>, <tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6bc7f20750>, <tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6b7a9b8850>, <tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6bc7f298d0>, <tests.test_remote_log_handler._FakeLogServiceClient object at 0x7f6b59eb1c50>])
    ====== 2 failed, 1945 passed, 3 skipped, 41 warnings in 378.81s (0:06:18) ======

Let's make flush() block, it's fine for close() to be best-effort, ignore that PR comment.

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 17, 2026

Claude finished @rjpower's task in 15m 19s —— View job


Done — bc7c9668f pushed to iris-logpusher-resolver

  • Read current LogPusher and tests to understand state
  • Reproduce failing tests locally (4 scenarios all passed in isolation — CI failures were load-dependent races on the _wait_for polling)
  • Make flush() block until queued entries are processed
  • Update tests to use blocking flush for deterministic synchronization
  • Run flake-finder (260/260 passed over 20× runs)
  • Run worker tests (38/38 passed) + pre-commit (clean)
  • Commit and push

What changed

lib/iris/src/iris/log_server/client.py

  • Queue entries now carry a monotonic per-entry seq: deque[tuple[int, str, LogEntry]].
  • New _pushed_seq / _processed_seq counters guarded by _cond.
  • flush(timeout: float | None = None) -> bool snapshots _pushed_seq and waits on the condvar until _processed_seq >= target. Returns True if drained, False on timeout. Default timeout=None waits indefinitely.
  • Drain thread advances _processed_seq and calls notify_all() after each successful send.
  • _trim_oldest_locked also advances _processed_seq for overflow-dropped entries, so a flush can't wait forever on entries that will never reach the server.
  • _send_items now returns (max_sent_seq, unsent_items) so the drain knows the high-water mark across partial success.
  • close() stays best-effort (per directive — ignored the P1 close-vs-send race comment).

lib/iris/tests/test_remote_log_handler.py

  • Replaced the _wait_for(lambda: created and created[0].pushes) polling in test_failures_always_deliver_via_retry with assert pusher.flush(timeout=5.0) — removes the xdist-load race that produced assert 2 == 1 / assert 6 == 1.
  • Replaced delivery-side _wait_for(delivered, 10.0) with assert pusher.flush(timeout=10.0) + a synchronous delivered() assertion.
  • New tests: test_log_pusher_flush_is_blocking (no polling after flush) and test_log_pusher_flush_timeout_returns_false (seeds a non-retryable error, flush beats the 0.5 s backoff).

On the Codex comments

  • P1 synchronous emit path — already off the emit path: push() just enqueues; all resolver/RPC work happens on the drain thread. No change needed, the drain-thread architecture already fixes this.
  • P2 close-path late resolve — with blocking flush(), callers can flush before closing the controller client. close() itself stays best-effort per your directive.
  • P1 close-vs-send race — ignored per your directive.
  • P2 synchronous flush — addressed by this commit.

Test results

  • tests/test_remote_log_handler.py — 13 passed, then 260 passed under flake-finder (20× each)
  • tests/cluster/worker/ — 38 passed
  • ./infra/pre-commit.py --all-files --fix — clean (ruff, black, license, pyrefly, etc.)
    • Branch: iris-logpusher-resolver • Commit: bc7c9668f

Adds a per-entry monotonic seq to the queue. `flush(timeout=None)` snapshots
_pushed_seq and waits on the condvar until the drain has advanced
_processed_seq past it, where "processed" means either sent or
overflow-dropped. Dropped entries advance the marker so a flush doesn't
wait forever on entries that will never reach the server.

Makes task_attempt._cleanup's `self._log_pusher.flush()` actually wait for
final logs to ship, restoring the pre-rewrite contract. close() stays
best-effort per directive.

Also replaces the _wait_for polling in test_failures_always_deliver_via_retry
with deterministic flush(timeout=...) synchronization — removes the xdist-
load flake that showed up as assert 2==1 / 6==1 on the seed/seed-then-push
race.

Co-authored-by: Russell Power <rjpower@users.noreply.github.com>
@rjpower rjpower merged commit ca98d82 into main Apr 17, 2026
37 of 40 checks passed
@rjpower rjpower deleted the iris-logpusher-resolver branch April 17, 2026 18:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

agent-generated Created by automation/agent

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Iris - missing logs

2 participants